package com.uxsino.reactorq.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import javax.jms.JMSException;

import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.uxsino.reactorq.commons.ReactorQFactory;
import com.uxsino.reactorq.commons.TopicFlux;

/**
 * receive and dispatch events from {@link TopicFlux}. see {@link IEvent} {@link Event} 
 * 
 *
 */
public class EventSource {
    private static Logger logger = LoggerFactory.getLogger(EventSource.class);

    // the flux factory
    private ReactorQFactory rqFactory;

    /**
     * create an event source
     * @param rqFactory the flux factory object. see {@link ReactorQFactory}
     */
    public EventSource(ReactorQFactory rqFactory) {
        this.rqFactory = rqFactory;
    }

    private Map<String, TopicFlux<?>> fluxes = new HashMap<>();

    private class SubscribeEntry {
        public String eventName;

        @SuppressWarnings("unused")
        public Class<? extends Event> cls;

        public Consumer<IEvent> action;

        public Map<String, String> filterValues = new HashMap<>();

        public SubscribeEntry(Class<? extends Event> eventClass, Consumer<IEvent> subscriber) {
            this.eventName = Event.getEventName(eventClass);
            cls = eventClass;
            action = subscriber;
        }

        @SuppressWarnings("unused")
        public void addFilter(String propName, String propExpr) {
            filterValues.put(propName, propExpr);
        }

    }

    private List<SubscribeEntry> subscribers = new ArrayList<>();

    /**
     * subscribe an event type by given {@link Subscriber}
     * @param eventClass the event class
     * @param receiverId if given, only message with property 'receiverId' set to this receiverId will be accepted.
     * ignoring other messages
     * @param subscriber the subscriber
     * @return
     */
    public SubscribeEntry subscribeEvent(Class<? extends Event> eventClass, String receiverId,
        Subscriber<IEvent> subscriber) {
        return subscribeEvent(eventClass, receiverId, v -> subscriber.onNext(v));
    }

    /**
     * subscribe an event type by given {@link Consumer}
     * @param eventClass the event class
     * @param receiverId if given, only message with property 'receiverId' set to this receiverId will be accepted.
     * ignoring other messages
     * @param subscriber the subscriber
     * @return
     */
    public SubscribeEntry subscribeEvent(Class<? extends Event> eventClass, String receiverId,
        Consumer<IEvent> subscriber) {

        String topicName = getEventTopicName(eventClass);
        if (!fluxes.containsKey(topicName)) {
            TopicFlux<Event> flux = null;
            try {
                flux = rqFactory.<Event> createTopicFlux(topicName, eventClass, receiverId);
                flux.onPostDecode(this::postEventDecode);
            } catch (JMSException e) {
                logger.error("cannot connect to activemq broker, try again later.");
            }
            fluxes.put(topicName, flux);
            if (flux != null) {
                flux.subscribe(this::dispatch);
            }

        }

        SubscribeEntry entry = new SubscribeEntry(eventClass, subscriber);
        subscribers.add(entry);
        return entry;

    }

    /**
     * get the replyTo from Message and put into Event
     * 
     * @param msg
     * @param event
     */
    private void postEventDecode(javax.jms.Message msg, Event event) {
        try {
            event.replyTo = msg.getJMSReplyTo();
        } catch (JMSException e) {
            logger.error("can not get reply to?? {}", e);
        }

    }

    private void dispatch(Event event) {
        for (SubscribeEntry entry : subscribers) {
            if (entry.eventName.equals(event.getEventName())) {
                entry.action.accept(event);
            }
        }
    }

    private String getEventTopicName(Class<? extends Event> eventClass) {
        return "simo-event-" + Event.getEventName(eventClass);

    }

}
